--- title: Sensors keywords: fastai sidebar: home_sidebar summary: "Reads and packages auxillary sensor (IMU/GPS) data over UART" description: "Reads and packages auxillary sensor (IMU/GPS) data over UART" nb_path: "nbs/07_sensors.ipynb" ---
{% include tip.html content='This module can be imported using from openhsi.sensors import *' %}{% include warning.html content='Still experimental. Stay tuned.' %}
Sensor data packet read and decode pyserial code below. Modified from a past project of mine.
Todo: Read the RTC timestamp at beginning of collection. The data packets only contain timestamp offsets.
while True:
GPIO.setmode(GPIO.BCM) # BCM pin-numbering scheme from Raspberry Pi
GPIO.setup(go_but, GPIO.IN)
if GPIO.input(go_but) == True:
#bounce
record()
else:
to_DF()
#save with time delta
remove event
while True:
GPIO.setmode(GPIO.BCM) # BCM pin-numbering scheme from Raspberry Pi
GPIO.setup(go_but, GPIO.IN)
if GPIO.input(go_but) == True:
camera ready
mount
p = Process(GPS?)
collect
save
else:
sleep
p.join
"sudo unmount /dev/..."
os.system()
import serial
import sched, time
import pandas as pd
import numpy as np
import param
import panel as pn
import hvplot.pandas
import hvplot.streamz
import holoviews as hv
from holoviews.element.tiles import EsriImagery
from holoviews.selection import link_selections
from datashader.utils import lnglat_to_meters
from streamz.dataframe import PeriodicDataFrame
hv.extension('bokeh',logo=False)
from holoviews.streams import Pipe, Buffer
class SensorStream():
def __init__(self, baudrate=115_200, port="/dev/cu.usbserial-DN05TVTD",buff_len:int = 100):
self.ser = serial.Serial(port=port,
baudrate=baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,)
# Instantiate for storing data
self.data = []
self.data_df = pd.DataFrame(None, columns=["lat","lon","sats","temp","pressure","humidity","sys_cal","gyro_cal","accel_cal","mag_cal"])
self.loc_stream = Buffer( pd.DataFrame({"lon":[],"lat":[]}), length=buff_len, index=False)
#temp = pd.DataFrame({"lon":[151.1,151.2],"lat":[-33.8,-34]})
#self.loc_stream.send( pd.concat(lnglat_to_meters(temp["lon"],temp["lat"]),axis=1))
self.loc_dmap = hv.DynamicMap(hv.Scatter,streams=[self.loc_stream]).opts(xlabel="latitude",ylabel="longitude")
self.map_tiles = EsriImagery().opts(alpha=0.5,bgcolor='white')
self.sat_stream = Buffer( pd.DataFrame({"sats":[]}), length=buff_len)
self.sat_dmap = hv.DynamicMap(hv.Curve,streams=[self.sat_stream]).opts(xlabel="index",ylabel="number of satellites in view")
self.temp_stream = Buffer( pd.DataFrame({"temp":[]}), length=buff_len)
self.temp_dmap = hv.DynamicMap(hv.Curve,streams=[self.temp_stream]).opts(xlabel="index",ylabel="temperature (deg C)",toolbar=None)
self.pressure_stream = Buffer( pd.DataFrame({"pressure":[]}), length=buff_len)
self.pressure_dmap = hv.DynamicMap(hv.Curve,streams=[self.pressure_stream]).opts(xlabel="index",ylabel="pressure (hPa)",toolbar=None)
self.humidity_stream = Buffer( pd.DataFrame({"humidity":[]}), length=buff_len)
self.humidity_dmap = hv.DynamicMap(hv.Curve,streams=[self.humidity_stream]).opts(xlabel="index",ylabel="relative humidity %",toolbar=None)
self.sys_stream = Buffer( pd.DataFrame({"sys_cal":[]}),length=buff_len)
self.sys_dmap = hv.DynamicMap(hv.Curve,streams=[self.sys_stream],label="system")
self.gyro_stream = Buffer( pd.DataFrame({"gyro_cal":[]}),length=buff_len)
self.gyro_dmap = hv.DynamicMap(hv.Curve,streams=[self.gyro_stream],label="gyro")
self.accel_stream = Buffer( pd.DataFrame({"accel_cal":[]}),length=buff_len)
self.accel_dmap = hv.DynamicMap(hv.Curve,streams=[self.accel_stream],label="accel")
self.mag_stream = Buffer( pd.DataFrame({"mag_cal":[]}),length=buff_len)
self.mag_dmap = hv.DynamicMap(hv.Curve,streams=[self.mag_stream],label="mag")
self.clear_btn = pn.widgets.Button(name='Clear', button_type='primary')
self.msg = pn.widgets.StaticText(name="")
self.clear_btn.on_click(self.clear_all)
self.startstop_btn = pn.widgets.Button(name="Start",button_type="success")
self.check_run = False
self.startstop_btn.on_click(self.startstop)
self.counter = 0
def run(self):
while True:
time.sleep(1)
if self.check_run:
self.read()
self.update()
self.counter += 1
self.msg.value = f"Running. Count = {self.counter}"
def startstop(self,event):
self.check_run = not self.check_run
self.startstop_btn.name = "Pause" if self.startstop_btn.name == "Start" else "Start"
self.startstop_btn.button_type = "danger" if self.startstop_btn.button_type == "success" else "success"
self.msg.value = "Paused" if self.startstop_btn.name == "Start" else "Running"
if self.startstop_btn.name == "Start": self.counter = 0
def __call__(self):
self.imu_stats = (self.sys_dmap*self.gyro_dmap*self.accel_dmap*self.mag_dmap).opts(ylabel="IMU calibration status",legend_position="left")
return pn.Column(pn.Row(self.loc_dmap.opts(hv.opts.Scatter(size=10))*self.map_tiles,self.sat_dmap,self.imu_stats),
pn.Row(self.temp_dmap,self.pressure_dmap,self.humidity_dmap),
pn.Row(pn.Column(self.clear_btn,self.startstop_btn),self.msg)).servable()
def close(self):
self.ser.close()
def clear_all(self,event):
self.loc_stream.clear()
self.sat_stream.clear()
self.temp_stream.clear()
self.pressure_stream.clear()
self.humidity_stream.clear()
self.sys_stream.clear()
self.gyro_stream.clear()
self.accel_stream.clear()
self.mag_stream.clear()
self.msg.value = f"Cleared {self.clear_btn.clicks} time(s)"
def update(self):
#self.data_df = pd.DataFrame({"lon":[151.1,151.2],"lat":[-33.8,-34],"temp":[20,30],"sys":[0,3],"gyro":[0,0]})
self.loc_stream.send( pd.concat(lnglat_to_meters(self.data_df['lon'], self.data_df['lat']),axis=1))
self.sat_stream.send( self.data_df["sats"].to_frame() )
self.temp_stream.send( self.data_df["temp"].to_frame() )
self.pressure_stream.send( self.data_df["pressure"].to_frame() )
self.humidity_stream.send( self.data_df["humidity"].to_frame() )
self.sys_stream.send( self.data_df["sys_cal"].to_frame() )
self.gyro_stream.send( self.data_df["gyro_cal"].to_frame() )
self.accel_stream.send( self.data_df["accel_cal"].to_frame() )
self.mag_stream.send( self.data_df["mag_cal"].to_frame() )
def read(self,timeout:float=2):
start_time = time.time()
# Check if line is ready
while self.ser.inWaiting() > 0:
# Read line from serial
self.line_data = self.ser.readline()
if ord('*') not in self.line_data or (len(self.line_data) < 23) : continue # packet not complete, skip
if time.time()-start_time > timeout:
print("timeout")
self.ser.flushInput()
break
contents = []
np_buff = np.frombuffer(self.line_data,dtype='int8').astype(np.uint8)
contents.append( float(np_buff[1:5].view(np.int32))*1e-7 ) # latitude [deg]
contents.append( float(np_buff[5:9].view(np.int32))*1e-7 ) # longitude [deg]
contents[0] += np.random.rand()
contents[1] += np.random.rand()
contents.append( *np_buff[9:10].view(np.uint8) ) # number of satellites in view and used in compute
contents.append( *np_buff[10:14].view(np.float32) ) # temperature [deg C]
contents.append( *np_buff[14:18].view(np.float32) ) # pressure [hPa]
contents.append( *np_buff[18:22].view(np.float32) ) # humidity [relative humidity %]
cal_char = np_buff[22].view(np.uint8)
contents.append( (cal_char & 0b1100_0000) >> 6) # system calibration
contents.append( (cal_char & 0b0011_0000) >> 4) # gyro calibration
contents.append( (cal_char & 0b0000_1100) >> 2) # accel calibration
contents.append( cal_char & 0b0000_0011 ) # mag calibration
self.data.append(contents)
self.data_df = pd.DataFrame(self.data, columns=["lat","lon","sats","temp","pressure","humidity","sys_cal","gyro_cal","accel_cal","mag_cal"])
ss = SensorStream()
ss()
{% include warning.html content='Start/Pause button not working!' %}
ss.run()
ss.ser.close()
ss.data_df
peripherals = SensorStream(baudrate=921_600,port="/dev/ttyTHS0")
peripherals.run() # start on switch on, pause on switch off, KeyboardInterrupt to stop
peripherals = SensorStream(baudrate=921_600,port="/dev/ttyTHS0")
peripherals.record(max_timeout=2)
print(peripherals.data)
print("hardware")
raise ValueError("Hardware flag entered")